[Firebase] ReactiveX(RxJava,RxSwift,RxJS) を使ったFirestoreのクライアント実装
背景
前職でコミュニケーションサービスをゼロから立ち上げるにあたって、がっつりFiretoreを実戦投入しました。その際に調べた事や挑戦、ノウハウなどをブログとして残そうと思った次第です。
一からの入門記事ではございませんので、そこのトコロはご了承下さい。分かる方はコードブロックだけ眺めて下さればOKです。
APIサーバレスなアーキテクチャ
Firestoreの(ここで触れておきたい)特徴
- 分散DB
- クライアントはDBに直接接続する
- クライアントはローカルにもDBを持つ(単なるReadキャッシュではない)
その他は公式を参照するなりしてください。
困り事
上述のようにクライアントはDBサーバに直接接続する構成を採るため、APIサーバのような抽象化レイヤが介在できません。そのため、抽象化のための実装は、各クライアント上に実現する事になります。つまり
- APIサーバでやってた処理がクライアントのModel層へ
- クライアントの種類の分だけ実装が必要
になります。じゃあ、とAndroid、iOS、Webの三つ分をそれぞれ別個に実装していると、後の管理・保守工数含めて大変な事になってしまいますよね? Firestoreは後述する要素も含めて、非常に魅力的なアーキテクチャなのですが、この現実は頂けません。対策必須です。
我々はこの問題に対処するため、 ReactiveX (Reactive Extentions) を採用しました。ReactiveXを使うとどうなるか、それを具体的に紹介するのがこの記事の目的です。
ReactiveXでModel層を(出来るだけ)統一的に実装する
ReactiveXに関しては詳細は公式等参照なりググるなりして頂くとして、
- 各言語毎に実装があり、そこで使用するオペレータ(の名称や、それが実現する処理の内容)などは同じ
- よって類似性の高いコードを書くことが出来る
- それによりトータルの開発工数や管理工数を削減出来る
という三段論法です(加えて四段目として、Firestoreのクライアント実装にReactiveXを使えば色々便利だよ、というのもあります)。
では早速
コード
1. クライアント毎にObservable化する
公式にある通り、クライアントはFirestoreのドキュメントやコレクションを直接リッスンします。これにより、対象がアップデートされる都度、データを受け取る事が出来ます。これを一連のデータストリームとして取り扱うのが、ここでの主眼となります。
import com.google.firebase.firestore.* import io.reactivex.Observable import io.reactivex.disposables.Disposables class CollectionStream { companion object { fun <T: Any> from(query: Query, documentClass: Class<T>): Observable<List<T>> { return Observable.create { emitter -> val registration = query.addSnapshotListener { snapshot, error -> if (error != null) { emitter.onError(error) } else { val list = snapshot!!.documents.map { d -> try { d.toObject(documentClass) } catch (e: Exception) { null } }.filterNotNull() emitter.onNext(list) } } emitter.setDisposable(Disposables.fromRunnable { registration.remove() }) } }
import Foundation import FirebaseFirestore import RxSwift protocol Document { init?(id: String, data: [String : Any]) } class CollectionStream { static func from<T: Document>(query: Query, documentClass: T.Type) -> Observable<[T]> { return Observable.create { observer in let listener = query.addSnapshotListener() { (snapshot, err) in if let err = err { observer.on(.error(err)) return } let list: [T] = snapshot!.documents .map { doc in T(id: doc.documentID, data: doc.data()) } .compactMap { $0 } // remove nil observer.on(.next(list)) } return Disposables.create { listener.remove() } } }
import firebase from 'firebase/app' import 'firebase/firestore' import { Observable } from 'rxjs'; class CollectionStream { static from(query) { return Observable.create(observer => { let dispose; dispose = query.onSnapshot(snapshot => { observer.next(snapshot.docs.map(doc => ({ id: doc.id, data: doc.data() }))); }, error => { observer.error(error) }); return () => { dispose(); } }); }
さてこうして取り出したObservableには、クラウド(またはローカル※)でデータがUpdateされる都度、そのデータが流れてきます。これらの Observable を subscribe()
する事で、アップデートをリアルタイムで取得出来るようになりました。
2. Model層で色々なIFを用意する
ここでは、ReactiveXの色々なオペレータを使って、目的に応じてデータストリームを加工・合成等してゆきます。
例1. チャットルームでの発言回数をSUMる
Firestoreのサンプルと言えばチャット、という事で、ここでもそれに倣いましょう。”チャットの全ての発言のリスト”を取得するオリジナルのデータストリームがあるとして、ReactiveXではそれをベースに、自分の発言回数を取得するストリームを新たに作成します。
class SampleModel { companion object { fun getMessagesOf(roomId: String): Observable<List<Message>> { val query = FirebaseFirestore.getInstance() .collection("chatrooms/$roomId/messages") .orderBy("createdAt", Query.Direction.DESCENDING) return CollectionStream.from(query, Message::class.java) } fun getSpeakCountAt(roomId: String): Observable<Int> { return getMessagesOf(roomId) .map { messages -> messages.filter { message -> message.ownerUid == "self-uid" } } .map { it.count() } } }
class SampleModel { static func getMessagesOf(roomId: String) -> Observable<[Message]> { let query = Firestore.firestore() .collection("chatrooms/\(roomId)/messages") .order(by: "createdAt", descending: true) return CollectionStream.from(query: query, returningClass: Message.self) } static func getSpeakCountAt(roomId: String) -> Observable<Int> { return getMessagesOf(roomId: roomId) .map { messages in messages.filter { message in message.ownerUid == "self-uid" } } .map { $0.count } }
import { map } from "rxjs/operators"; class SampleModel { static getMessagesOf(roomId) { const query = firebase.firestore() .collection(`chatrooms/${roomId}/messages`) .orderBy("createdAt", "desc"); return CollectionStream.from(query); } static getSpeakCountAt(roomId) { return self.getMessagesOf(roomId) .pipe( map(messages => { return messages.filter(d => d.data["ownerUid"] === "self-uid"); }), map(list => list.size()), ) }
またLimit無しに全件取得するようなQueryは、言うまでもなく乱暴です。ページングや定時バッチので集計と組み合わせる等、上手く設計しましょう。
続けてもう一つサンプル。
例2. 発言者の名前をリアルタイムに解決する(非正規化なしで)
吹き出しの横に発言者の名前を表示したいです。どうしますか?
import io.reactivex.functions.BiFunction class SampleModel { companion object { fun getMembersOf(roomId: String): Observable<List<Member>> { val query = FirebaseFirestore.getInstance().collection("chatrooms/$roomId/members") return CollectionStream.from(query, Member::class.java) } fun getMessagesAndOwnersOf(roomId: String): Observable<List<Pair<Message, Member?>>> { val messagesStream = getMessagesOf(roomId) val membersStream = getMembersOf(roomId) return Observable.combineLatest(messagesStream, membersStream, BiFunction { messages, members -> messages.map { msg -> val owner = members.find { it.id == msg.ownerUid } Pair(msg, owner) } }) }
class SampleModel { static func getMembersOf(roomId: String) -> Observable<[Member]> { let query = Firestore.firestore().collection("chatrooms/\(roomId)/members") return CollectionStream.from(query: query, documentClass: Member.self) } static func getMessagesAndOwnersOf(roomId: String) -> Observable<[(Message, Member?)]> { let messagesStream = getMessagesOf(roomId: roomId) let membersStream = getMembersOf(roomId: roomId) return Observable.combineLatest(messagesStream, membersStream) { messages, members in messages.map { msg in let owner = members.first { $0.id == msg.ownerUid } return (msg, owner) } } }
import { combineLatest } from "rxjs"; class SampleModel { static getMembersOf(roomId) { const query = firebase.firestore().collection(`chatrooms/${roomId}/members`); return CollectionStream.from(query); } static getMessagesAndOwnersOf(roomId) { return combineLatest(SampleModel.getMessagesOf(roomId), SampleModel.getMembersOf(roomId)) .pipe( map(([ messages, members ]) => { return messages.map(msg => { const member = members.find(m => m.id === msg.data["ownerUid"]); return { message: msg, owner: member, } }) }), ) }
このように「クライアントサイドでJoin」すれば、非正規化せずとも、Membersストリームから常に最新のデータが取得できます。この手法ですと、Firestoreからデータを取得する際の自由度が大幅に上がるので、とてもお勧めです。
さて改めて各クライアントのコードをご覧頂けると、とても良く似ている、という印象を持って頂けたのではないでしょうか?
- 「一つの言語で開発してから横展開」が割と現実的
※ 実際に私は、IDEの型推測機能が優秀なAndroidStudio+Kotlin環境でまず書いて、それをXcode環境に持って行く(コピペ+α)、というスタイルで作業してました。 - 見比べれば、実装ロジックが違ってもすぐに気付く事が出来る
- 但しスレッド周りは除く
というのが私の評価です。ブログをご覧の方々も同じ感触を持って頂けると嬉しいのですが。
3. ViewでObservableをSubscribeする
さて、ここまで見てきたObservableの実装は、処理そのものではなく「データが流れてきたらこうするよ」という処理の定義です。実際にはSubscriber(主にView)がObservableを subscribe()
して初めて、Firestoreに対するListenが開始され、データが流れ始めます(ColdとかHotの話はここでは割愛)。
<script> import SampleModel from "../models/SampleModel"; (略) export default { name: 'SampleChat', props: [ "roomId" ], data() { return { room: {}, messages: [], speakCount: 0, subscriptions: [], } }, created() { this.subscriptions.push(SampleModel.getMessagesAndOwnersOf(this.roomId).subscribe(v => { this.messages = v; })); this.subscriptions.push(SampleModel.getSpeakCountAt(this.roomId).subscribe(v => { this.speakCount = v; })); this.subscriptions.push(SampleModel.getInfoOf(this.roomId).subscribe({ next: (v) => { this.room = v; }, error: (e) => console.log("error", e), })); }, beforeDestroy() { this.subscriptions.forEach(s => s.unsubscribe()); }, } </script> <template> <div> <h1>{{ room.data ? room.data.name : "" }}</h1> <ul> <li v-for="item in messages"> {{ item.message.data.text }} ({{ item.owner ? item.owner.data.name : "unknown" }}) </li> </ul> <p>あなたの発言回数:{{ speakCount }}</p> </div> </template>
だいぶテンプレ感溢れるコードになるので、VueRxをお勧めします。ライフサイクルに合わせて subscribe
とunsubscribe
も自動で行ってくれます。
初期化フェーズで Vue.use(VueRx)
しておいて下さい。
export default { name: 'SampleChat', props: [ "roomId" ], subscriptions() { return { messages: SampleModel.getMessagesAndOwnersOf(this.roomId).pipe(tap(v => console.log(v))), speakCount: SampleModel.getSpeakCountAt(this.roomId).pipe(tap(v => console.log(v))), room: SampleModel.getInfoOf(this.roomId).pipe( tap(v => console.log(v)), startWith({}), ), } },
これで、あなたや他の誰かが発言すれば直ぐに自動で、一覧や発言回数が更新されます。
ここまで来たら、発言機能も付けましょう。
4. 更新処理もObservableにする
データ更新系の処理も、データ取得系と同様にObservableとして実装する事が可能です。勿論必ずしもObservable化しなくともよいですが、プレチェックを行う際など、Firestoreのデータ状況を参照したい場合には、取得系のObservableと繋げる事で全体をスッキリ記述できます。
ここではネタとして、発言回数に制限を設けましょう。
import {map, mergeMap, take} from "rxjs/operators"; import {combineLatest, Observable, of} from "rxjs"; class SampleModel { static sendMessage(roomId, message) { const creation = Observable.create(observer => { const docRef = firebase.firestore().collection(`chatrooms/${roomId}/messages`).doc(); console.log("document will be created locally at first."); observer.next(docRef.id); // 新規ドキュメントのIDを返却してみる docRef.set({ text: message, ownerUid: UserSession.uid(), createdAt: firebase.firestore.FieldValue.serverTimestamp(), }).then(() => { console.log("document has synced to the cloud."); observer.complete(); }).catch(error => { observer.error(error) }); return () => {} // do nothing at unsubscribe() }); // このチャットでは、一人10回までしか発言できない!という仕様とする return this.getSpeakCountAt(roomId).pipe( take(1), mergeMap(count => { if (count <= 10) { return creation; } else { return of("speak count has reached to the limit."); } }) ); }
※ Firestoreの更新系処理のコールバック(あるいはPromiss.then)について注意すべき点は、オフライン状態だといつまでも呼ばれないという事です。このコールバックは `onSynced` とも言うべきもので、ローカルの更新がクラウドに同期されて初めて発火します。後続処理をこのコールバック内で実装すると、オフライン環境だと先に進めなくなりますので注意が必要です。
これはErrorに関しても同様で、例えばクラウド定義のRuleに抵触するような書き込みであっても、オフラインだとエラースローされません。この場合、オンラインに戻ったタイミングでドキュメントはローカルから削除されます。
export default { data() { form: { message: "", }, }, subscriptions() { ...略... }, methods: { sendMessage: function() { SampleModel.sendMessage(this.roomId, this.form.message).subscribe({ next: v => console.log(v), complete: () => {}, // nop error: e => console.error(e), }); this.form.message = ""; }, } } </script> <template> <div> 略 <p>あなたの発言回数:{{ speakCount }}</p> <p> <input type="text" v-model="form.message"><input type="button" value="send" @click="sendMessage"> </p> </div> </template>
この実装は、オフライン状態でも普通に動作しますので、是非お試し下さい。クラウドにSync出来なくとも、ローカルデータは追加されるので、そのUpdateを受けて一覧画面は更新されます(※)。
当然ながら、オンラインに復帰するまで他の人の発言は見えませんしその逆もそうですが、オフラインを理由にユーザの操作をブロックするよりも、操作自体は受け入れる方が、UXとしては良いのではと思います(但しその結果、一時的な不整合状態が発生するのを甘受する前提ですが)。
まとめ
Firestoreを採用する際の問題は、各クライアント毎に、正確な実装を行う必要がある事だと思います。単純に手数が増えるというだけでなく、各クライアントが全て正しく同じに実装されなければなりません。これは従来APIサーバ一カ所で正確性を担保するだけで良かったのに比べると、大きなプレッシャーです。
この記事では、開発・管理コストを削減するために、ReactiveXを使って各クライアントで実装を揃える事を提案・実演してみました。他に類似するものがないこの独特なサービス・アーキテクチャと上手く付き合ってゆくにあたって、この記事が幾らかの参考になれば嬉しいです。